package com.bawei.persona.realtime.app.func;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import com.bawei.persona.realtime.common.GmallConfig;
import com.bawei.persona.realtime.util.CarrierUtils;
import com.bawei.persona.realtime.util.ThreadPoolUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
 * 上海大数据学院
 * 项目规划及管理：李剑
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11
 *
 * 思路:将数据传递过来的json 对象 根据不同的表
 * 因为数据保存在json 对象钟可以根据不同的数据据自动进行配置，
 * 所以把这个看成一个可以通用调用的类，在这里面根据不同的表去实现
 *
 * 特别注意，再这里牺牲空间。将将来的额进行用户画像的数据一次性给标签打上
 *
 *
 *
 */
public class DimAsyncFunctionPersona extends RichAsyncFunction<JSONObject, JSONObject> {

    //构造函数传递将来的可能的业务表，与那个维度表进行关联

    private  String talbename ="" ;
    //根据传递的业务把表传递过来

    // 数据库连接

    private transient Connection conn = null;
    private transient Table table = null;
    private transient Table catetable2 = null;
    private transient Table catetable1 = null;





    //线程池对象的父接口生命（多态）
    private ExecutorService executorService;

    @Override
    public void open(Configuration parameters) throws Exception {
        //进行hbase 数据库的打开
        //初始化线程池对象
        System.out.println("初始化线程池对象");
        executorService = ThreadPoolUtil.getInstance();
        //初始化数据库
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        //链接服务器
        conf.set("hbase.zookeeper.quorum", GmallConfig.HBASE_ZOOKEEPER_QUORUM);
        conf.set("hbase.zookeeper.property.clientPort", GmallConfig.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT);
        if (null == conn) {
            this.conn = ConnectionFactory.createConnection(conf);
        }
        String habseSchema = GmallConfig.HABSE_SCHEMA;
        String table1 = talbename; //表名
        String tablename = habseSchema + "." +table1;
       // open the table
        table = conn.getTable(TableName.valueOf(tablename));



    }

    public DimAsyncFunctionPersona(String talbename) {
        this.talbename = talbename;
    }

    public String  judgeEamal(String email) {
        String emaltype="其他";
        if(email.equalsIgnoreCase("163.com")|| email.equalsIgnoreCase("126.com") ){
            emaltype="网易" ;
        }
        if(email.equalsIgnoreCase("139.com") ){
            emaltype="移动" ;
        }
        if(email.equalsIgnoreCase("sohu.com") ){
            emaltype="搜狐" ;
        }if(email.equalsIgnoreCase("qq.com") ){
            emaltype="qq" ;
        }if(email.equalsIgnoreCase("189.com") ){
            emaltype="tom" ;
        }if(email.equalsIgnoreCase("aliyun.com") ){
            emaltype="阿里" ;
        }
        if(email.equalsIgnoreCase("sina.com") ){
            emaltype="新浪" ;
        }
        return  emaltype;
    }
    @Override
    public void asyncInvoke(JSONObject jsonObject, ResultFuture<JSONObject> resultFuture) throws Exception {
 //对数据库表的结构进行操作
        executorService.submit(
                new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //发送异步请求
                            long start = System.currentTimeMillis();
                            //查询关联的结果值，需要对应的数据进行更新
                            String key ="" ;
                            //从流中事实数据获取key
                            if(GmallConfig.DIM_USER_INFO.equalsIgnoreCase(talbename)) {
                                //这个是去查询基础表中的用户信息，来源与宽表中的userid
                                 key =jsonObject.getString("user_id");
                            }
                            //如果是关联省份维度表
                            if(GmallConfig.DIM_PROVICE.equalsIgnoreCase(talbename)) {
                                //这个是去查询基础表中的用户信息，来源与宽表中的userid
                                key =jsonObject.getString("province_id");
                            }
                            //与维度表sku_info 关联
                            if(GmallConfig.DIM_SKU_INFO.equalsIgnoreCase(talbename)) {
                                //这个是去查询基础表中的用户信息，来源与宽表中的userid
                                key =jsonObject.getString("sku_id");
                            }
                           //与维度表SPU_info 关联
                            if(GmallConfig.DIM_SPU_INFO.equalsIgnoreCase(talbename)) {
                                //这个是去查询基础表中的用户信息，来源与宽表中的userid
                                key =jsonObject.getString("spu_id");
                            }
                            //关联维度3
                            if(GmallConfig.DIM_BASE_CATEGORY3.equalsIgnoreCase(talbename)) {
                                //这个是去查询基础表中的用户信息，来源与宽表中的userid
                                key =jsonObject.getString("category3_id");
                            }
                            // 关联品牌维度
                            if(GmallConfig.DIM_BASE_TRADEMARK.equalsIgnoreCase(talbename)) {
                                //这个是去查询基础表中的用户信息，来源与宽表中的userid
                                key =jsonObject.getString("tm_id");
                            }

                            String table1 = talbename; //表名
                            //根据维度的主键到维度表中进行查询
                            // scan 范围数据
                            String habseSchema = GmallConfig.HABSE_SCHEMA;
                            String tablename = habseSchema + "." +table1;
                            System.out.println( key + "chuzhigao = " + tablename);
                            String dimInfoJsonObj ="";
                            JSONObject jsonObject1 = new JSONObject();
                            if(key != null && key !=""){

//                                table = conn.getTable(TableName.valueOf(tablename));
                                Get get = new Get(Bytes.toBytes(key));
                                Result res = table.get(get);
                                //设置将来转化为的主键 ，将map 值转化为json 对象
                                Map resultMap = new HashMap() ;
                                resultMap.put("id", key) ;
                                for (Cell cell:res.rawCells()){
                                    //根据表明把所有的数据进行包装从json 类型
                                    resultMap.put("tablename", table1) ;
                                    //循环将hbase表中的数据给压缩进去
                                    byte[] bytes = CellUtil.cloneQualifier(cell);
                                    byte[] value = CellUtil.cloneValue(cell);
                                    String pkname  = Bytes.toString(bytes);
                                    String pkvalue  = Bytes.toString(value);
                                    resultMap.put(pkname, pkvalue) ;
                                    //将map 结构转化为json 对象
                                }
                                //得到的维度的json 的字符串
                                 dimInfoJsonObj = JSONObject.toJSONString(resultMap);
                                jsonObject1 = JSON.parseObject(dimInfoJsonObj);

                            }
                            //System.out.println("维度数据Json格式：" + dimInfoJsonObj);
                            if(dimInfoJsonObj != null){
                                //根据不同的业务进行不同的操作
                               //如果对用户进行关联 其中关心的用户指标为 用户的年纪，性别，邮件
                                if("dim_user_info".equalsIgnoreCase(talbename)){
                                    //获取邮件
                                    String email = jsonObject1.getString("email");
                                    if(email != null && email !=""){
                                        String emailGrp = email.substring(email.indexOf("@") +  1 );
                                        //追加宽表中类别email
                                        jsonObject.put("email",emailGrp) ;
                                        String emailChinese = judgeEamal(emailGrp);
                                        jsonObject.put("emailtype", emailChinese) ;
                                    }
                                    //设置性别

                                    String gender = jsonObject1.getString("gender");// == "F" ? "女" : "男";
                                    String genderType ="未知" ;
                                    if(gender.equalsIgnoreCase("F")){
                                        genderType="女";
                                    }
                                    if(gender.equalsIgnoreCase("M")){
                                        genderType="男";
                                    }
                                    //性别的维度
                                    jsonObject.put("user_gender", genderType) ;
                                    jsonObject.put("user_gendertype",jsonObject1.getString("gender") ) ;
                                    // 通过手机号码可以得到运营商，将来可以作为用户画像做准备
                                    String phone_num = jsonObject1.getString("phone_num");
                                    String[] carrierByTel = CarrierUtils.getCarrierByTel(phone_num).split("!");
                                    //讲运营商加入可以的关联维度中
                                    jsonObject.put("carrier", carrierByTel[0]) ;
                                    jsonObject.put("carriername", carrierByTel[1]) ;
                                    //设置年龄段
                                    //获取用户生日
                                    String birthday = jsonObject1.getString("birthday");
                                    //定义日期转换工具类
                                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                                    //将生日字符串转换为日期对象
                                    if(birthday != null && birthday !=""){
                                        Date birthdayDate = sdf.parse(birthday);
                                        //获取生日日期的毫秒数
                                        Long birthdayTs = birthdayDate.getTime();
                                        //获取当前时间的毫秒数
                                        Long curTs = System.currentTimeMillis();
                                        //年龄毫秒数
                                        Long ageTs = curTs - birthdayTs;
                                        //转换为年龄
                                        Long ageLong = ageTs / 1000L / 60L / 60L / 24L / 365L;
                                        Integer newdateinteger = ageLong.intValue();
                                        jsonObject.put("user_age",newdateinteger) ;
                                        String  yearbasetype ="" ;
                                        String  yearbasename ="" ;
                                        //判断
                                        if(newdateinteger >= 70 && newdateinteger < 80){
                                            yearbasetype = "L80";
                                            yearbasename = "40后";

                                        }else if (newdateinteger >= 60 && newdateinteger < 70){
                                            yearbasetype = "L70";
                                            yearbasename = "50后";
                                        }else if (newdateinteger >= 50 && newdateinteger < 60){
                                            yearbasetype = "L60";
                                            yearbasename = "60后";
                                        }else if (newdateinteger >=   40 && newdateinteger < 50){
                                            yearbasetype = "L50";
                                            yearbasename = "70后";
                                        }else if (newdateinteger >= 30 && newdateinteger < 40){
                                            yearbasetype = "L40";
                                            yearbasename = "80后";
                                        }else if (newdateinteger >= 20 && newdateinteger < 30){
                                            yearbasetype = "L30";
                                            yearbasename = "90后";
                                        }else if (newdateinteger >= 10 && newdateinteger < 20){
                                            yearbasetype = "L20";
                                            yearbasename = "00后";
                                        }else if (newdateinteger < 10 ){
                                            yearbasetype = "L10";
                                            yearbasename = "10后";
                                        }
                                        //保存用户的年龄段大小
                                        jsonObject.put("yearbasetype", yearbasetype) ;
                                        jsonObject.put("yearbasename", yearbasename) ;
                                    }
                                     }

                                //如果是维度省份表去进行关联
                                if(GmallConfig.DIM_PROVICE.equalsIgnoreCase(talbename)){
                                    //获取省份名字
                                    String name = jsonObject1.getString("name");
                                    jsonObject.put("province_name",name) ;
                                    //设置区域id
                                    String area_code = jsonObject1.getString("area_code");
                                    jsonObject.put("province_area_code", area_code) ;
                                    //设置iso_code
                                    String iso_code = jsonObject1.getString("iso_code");
                                    jsonObject.put("province_iso_code",iso_code) ;
                                    String iso_3166_2 = jsonObject1.getString("iso_3166_2");
                                    jsonObject.put("province_3166_2_code",iso_3166_2);
                                }
                                //库存量单位表SKU
                                if(GmallConfig.DIM_SKU_INFO.equalsIgnoreCase(talbename)){
                                    //获取商品名称名字
                                    String sku_name = jsonObject1.getString("sku_name");
                                    jsonObject.put("sku_name",sku_name) ;
                                    //产品id
                                    String spu_id = jsonObject1.getString("spu_id");
                                    jsonObject.put("spu_id", spu_id) ;
                                    //设置目录3详细表id
                                    String category3_id = jsonObject1.getString("category3_id");
                                    jsonObject.put("category3_id",category3_id) ;
                                    String tm_id = jsonObject1.getString("tm_id");
                                    jsonObject.put("tm_id",tm_id);
                                    String price = jsonObject1.getString("price");
                                    jsonObject.put("price",price);


                                }
                                //与spu info 进行关联

                                if(GmallConfig.DIM_SPU_INFO.equalsIgnoreCase(talbename)){
                                    //获取商品名称名字
                                    String spu_name = jsonObject1.getString("spu_name");
                                    jsonObject.put("spu_name",spu_name) ;
                                }
                                //关联品维度3
                                if(GmallConfig.DIM_BASE_CATEGORY3.equalsIgnoreCase(talbename)){
                                    //获取商品名称名字
                                    String name = jsonObject1.getString("name");
                                    jsonObject.put("category3_name",name) ;
                                    //特别注意，这里需要对所有的数据依次进行关联出来类型三级分类，将来好按三级进行品牌画像
                                    //根据级联关系挨个查询2次表结构，将3次关联的表自动给赋值进去
                                    //自动获取第二级的主键
                                    /////////////////////////查询第二级类型表开始///////////////////////////////////////////
                                    String category2_id = jsonObject1.getString("category2_id");
                                    //查询一次table 表

                                     catetable2 = conn.getTable(TableName.valueOf("GMALL0820_REALTIME.dim_base_category2"));
                                    //查询结果

                                    Get get2 = new Get(Bytes.toBytes(category2_id));
                                    Result res2 = catetable2.get(get2);

                                    //设置将来转化为的主键 ，将map 值转化为json 对象
                                    Map resultMapCateget2 = new HashMap() ;
                                    resultMapCateget2.put("id", category2_id) ;
                                    for (Cell cell:res2.rawCells()){
                                        //根据表明把所有的数据进行包装从json 类型
                                        resultMapCateget2.put("tablename", "GMALL0820_REALTIME.dim_base_category2") ;
                                        //循环将hbase表中的数据给压缩进去
                                        byte[] bytes = CellUtil.cloneQualifier(cell);
                                        byte[] value = CellUtil.cloneValue(cell);
                                        String pkname  = Bytes.toString(bytes);
                                        String pkvalue  = Bytes.toString(value);
                                        resultMapCateget2.put(pkname, pkvalue) ;
                                        //将map 结构转化为json 对象
                                    }
                                    //得到的维度的json 的字符串
                                    String jsonString = JSONObject.toJSONString(resultMapCateget2);
                                    JSONObject jsonObject2 = JSON.parseObject(jsonString);
                                    //得到所有的第2级的所有数据
                                    // name  category1_id
                                    String category2name = jsonObject2.getString("name");
                                    String category1_id = jsonObject2.getString("category1_id");
                                   // 同时将第二级分类插入相应的object 中去
                                    jsonObject.put("category2_name",category2name) ;
                                    jsonObject.put("category2_id",category2_id) ;
                                    /////////////////////////查询第二级类型表结束///////////////////////////////////////////
                                    /////////////////////////查询第一级类型表开始///////////////////////////////////////////
                                    //查询一次table 表
                                     catetable1 = conn.getTable(TableName.valueOf("GMALL0820_REALTIME.dim_base_category1"));
                                    //查询结果

                                    Get get1 = new Get(Bytes.toBytes(category1_id));
                                    Result res1 = catetable1.get(get1);

                                    //设置将来转化为的主键 ，将map 值转化为json 对象
                                    Map resultMapCateget1 = new HashMap() ;
                                    resultMapCateget1.put("id", category1_id) ;
                                    for (Cell cell:res1.rawCells()){
                                        //根据表明把所有的数据进行包装从json 类型
                                        resultMapCateget1.put("tablename", "GMALL0820_REALTIME.dim_base_category1") ;
                                        //循环将hbase表中的数据给压缩进去
                                        byte[] bytes = CellUtil.cloneQualifier(cell);
                                        byte[] value = CellUtil.cloneValue(cell);
                                        String pkname  = Bytes.toString(bytes);
                                        String pkvalue  = Bytes.toString(value);
                                        resultMapCateget1.put(pkname, pkvalue) ;
                                        //将map 结构转化为json 对象
                                    }
                                    //得到的维度的json 的字符串
                                    String jsonString1 = JSONObject.toJSONString(resultMapCateget1);
                                    JSONObject jsonObjectcategory1 = JSON.parseObject(jsonString1);
                                    //得到所有的第2级的所有数据
                                    // name  category1_id
                                    String category1name = jsonObjectcategory1.getString("name");

                                    // 同时将第二级分类插入相应的object 中去
                                    jsonObject.put("category1_name",category1name) ;
                                    jsonObject.put("category1_id",category1_id) ;
                                    /////////////////////////查询第一级类型表结束///////////////////////////////////////////

                                }
                                //品牌维度
                                if(GmallConfig.DIM_BASE_TRADEMARK.equalsIgnoreCase(talbename)){
                                    //获取商品名称名字
                                    String tm_name = jsonObject1.getString("tm_name");
                                    jsonObject.put("tm_name",tm_name) ;
                                }


                                //维度关联  流中的事实数据和查询出来的维度数据进行关联
//                                join(obj,dimInfoJsonObj);
                            }

                            System.out.println("维度关联后的对象:" + jsonObject);

                            long end = System.currentTimeMillis();
                            System.out.println("异步维度查询耗时" +(end -start)+"毫秒");
                            //将关联后的数据数据继续向下传递
                            resultFuture.complete(Arrays.asList(jsonObject));
                        } catch (Exception e) {
                            e.printStackTrace();
                            throw new RuntimeException(talbename + "维度异步查询失败");
                        }
                    }
                }
        );


    }

    @Override
    public void timeout(JSONObject input, ResultFuture<JSONObject> resultFuture) throws Exception {

    }

    @Override
    public void close() throws Exception {
        super.close();

        if (table != null){
            table.close();
        }
        if (catetable2 != null){
            //关闭打开的table资源
            catetable2.close();
        }
        if (catetable1 != null){
            catetable1.close();

        }




        if (conn != null){
            conn.close();
        }

    }


}
